{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(horovod)=\n",
    "# MPIJob and Horovod runtime\n",
    "\n",
    "## Running distributed workloads\n",
    "\n",
    "Training a Deep Neural Network is a hard task.  With growing datasets, wider and deeper networks, training our Neural Network can require a lot of resources (CPUs / GPUs / Mem and Time).  \n",
    "\n",
    "There are two main reasons why we would like to distribute our Deep Learning workloads:  \n",
    "\n",
    "1. **Model Parallelism** &mdash; The **Model** is too big to fit a single GPU.  \n",
    "In this case the model contains too many parameters to hold within a single GPU.  \n",
    "To negate this we can use strategies like **Parameter Server** or slicing the model into slices of consecutive layers which we can fit in a single GPU.  \n",
    "Both strategies require **Synchronization** between the layers held on different GPUs / Parameter Server shards.  \n",
    "\n",
    "2. **Data Parallelism** &mdash; The **Dataset** is too big to fit a single GPU.  \n",
    "Using methods like **Stochastic Gradient Descent** we can send batches of data to our models for gradient estimation. This comes at the cost of longer time to converge since the estimated gradient may not fully represent the actual gradient.  \n",
    "To increase the likelihood of estimating the actual gradient we could use bigger batches, by sending small batches to different GPUs running the same Neural Network, calculating the batch gradient and then running a **Synchronization Step** to calculate the average gradient over the batches and update the Neural Networks running on the different GPUs.  \n",
    "\n",
    "\n",
    "> It is important to understand that the act of distribution adds extra **Synchronization Costs** which may vary according to your cluster's configuration.  \n",
    "> <br>\n",
    "> As the gradients and NN needs to be propagated to each GPU in the cluster every epoch (or a number of steps), Networking can become a bottleneck and sometimes different configurations need to be used for optimal performance.  \n",
    "> <br>\n",
    "> **Scaling Efficiency** is the metric used to show by how much each additional GPU should benefit the training process with Horovod showing up to 90% (When running with a well written code and good parameters).\n",
    "\n",
    "![Horovod scaling](https://user-images.githubusercontent.com/16640218/38965607-bf5c46ca-4332-11e8-895a-b9c137e86013.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## How can we distribute our training?\n",
    "There are two different cluster configurations (which can be combined) we need to take into account.  \n",
    "- **Multi Node** &mdash; GPUs are distributed over multiple nodes in the cluster.  \n",
    "- **Multi GPU** &mdash; GPUs are within a single Node.  \n",
    "\n",
    "In this demo we show a **Multi Node Multi GPU** &mdash; **Data Parallel** enabled training using Horovod.  \n",
    "However, you should always try and use the best distribution strategy for your use case (due to the added costs of the distribution itself, ability to run in an optimized way on specific hardware or other considerations that may arise)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## How Horovod works?\n",
    "Horovod's primary motivation is to make it easy to take a single-GPU training script and successfully scale it to train across many GPUs in parallel. This has two aspects:\n",
    "\n",
    "- How much modification does one have to make to a program to make it distributed, and how easy is it to run it?\n",
    "- How much faster would it run in distributed mode?\n",
    "\n",
    "Horovod Supports TensorFlow, Keras, PyTorch, and Apache MXNet.\n",
    "\n",
    "in MLRun we use Horovod with MPI in order to create cluster resources and allow for optimized networking.  \n",
    "**Note:** Horovod and MPI may use [NCCL](https://developer.nvidia.com/nccl) when applicable which may require some specific configuration arguments to run optimally.\n",
    "\n",
    "Horovod uses this MPI and NCCL concepts for distributed computation and messaging to quickly and easily synchronize between the different nodes or GPUs.\n",
    "\n",
    "![Ring Allreduce Strategy](https://miro.medium.com/max/700/1*XdMlfmOgPCUG9ZOYLTeP9w.jpeg)\n",
    "\n",
    "Horovod will run your code on all the given nodes (Specific node can be addressed via `hvd.rank()`) while using an `hvd.DistributedOptimizer` wrapper to run the **synchronization cycles** between the copies of your Neural Network running at each node.  \n",
    "\n",
    "**Note:** Since all the copies of your Neural Network must be the same, Your workers will adjust themselves to the rate of the slowest worker (simply by waiting for it to finish the epoch and receive its updates). Thus try not to make a specific worker do a lot of additional work on each epoch (Like a lot of saving, extra calculations, etc...) since this can affect the overall training time."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## How do we integrate TF2 with Horovod?\n",
    "As it's one of the main motivations, integration is fairly easy and requires only a few steps: ([You can read the full instructions for all the different frameworks on Horovod's documentation website](https://horovod.readthedocs.io/en/stable/tensorflow.html)).  \n",
    "\n",
    "1. Run `hvd.init()`.  \n",
    "2. Pin each GPU to a single process.\n",
    "With the typical setup of one GPU per process, set this to local rank. The first process on the server will be allocated the first GPU, the second process will be allocated the second GPU, and so forth.  \n",
    "```\n",
    "gpus = tf.config.experimental.list_physical_devices('GPU')\n",
    "for gpu in gpus:\n",
    "    tf.config.experimental.set_memory_growth(gpu, True)\n",
    "if gpus:\n",
    "    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')\n",
    "```\n",
    "3. Scale the learning rate by the number of workers.  \n",
    "Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.\n",
    "4. Wrap the optimizer in `hvd.DistributedOptimizer`.  \n",
    "The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.  \n",
    "For TensorFlow v2, when using a `tf.GradientTape`, wrap the tape in `hvd.DistributedGradientTape` instead of wrapping the optimizer.\n",
    "1. Broadcast the initial variable states from rank 0 to all other processes.  \n",
    "This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.  \n",
    "For TensorFlow v2, use `hvd.broadcast_variables` after models and optimizers have been initialized.\n",
    "1. Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them.  \n",
    "For TensorFlow v2, construct a `tf.train.Checkpoint` and only call `checkpoint.save()` when `hvd.rank() == 0`.\n",
    "\n",
    "\n",
    "You can go to [Horovod's Documentation](https://horovod.readthedocs.io/en/stable) to read more about horovod."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Image classification use case"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "See the end to end [**Image Classification with Distributed Training Demo**](https://github.com/mlrun/demos/tree/1.0.x/mask-detection)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
